Python系列 之 socket模块 您所在的位置:网站首页 python3 selectors Python系列 之 socket模块

Python系列 之 socket模块

#Python系列 之 socket模块| 来源: 网络整理| 查看: 265

Python socket模块学习 socket模块学习非阻塞模式select模块selectors模块

socket模块学习 非阻塞模式

socket的默认情况下是阻塞模式:socket.accept()方法在没有接受到连接之前不能处理已经建立连接的其他操作,以及在recv()方法或者其他接受数据的方法时候都是阻塞的,如果没有接受到数据就会一直处于阻塞状态,来等待接受数据,这种情况只有通过开启新的进程或者线程来解决来自不同客户端的连接请求或者接受数据;socket可以支持非阻塞的模式;可以使用以下两种方法来设置socket的非阻塞模式:

# 设置套接字为阻塞或非阻塞模式:如果 flag 为 false,则将套接字设置为非阻塞,否则设置为阻塞。 # socket.setblocking(flag) # 如果value赋为 0,则套接字将处于非阻塞模式。如果指定为 None,则套接字将处于阻塞模式。 # socket.settimeout(value) # 阻塞 sock.setblocking(True) sock.settimeout(None) # 非阻塞 sock.setblocking(False) sock.settimeout(0.0)

在非阻塞模式下可以实现在单线程模式下实现与多个客户端连接的交互:

非阻塞模式的服务端:

# demo_socket_server_2.py文件 import logging import socket logging.basicConfig(level=logging.DEBUG, format="%(asctime)s>%(message)s", datefmt="%Y-%m-%d %H:%M:%S") logger = logging.getLogger(__name__) class ServerClass(object): """docstring for ServerClass""" def __init__(self): self.__HOST = "127.0.0.1" self.__PORT = 9999 self.ADDR = (self.__HOST, self.__PORT) self.__TCP_SOCKET = socket.socket( family=socket.AF_INET, type=socket.SOCK_STREAM) # 设置非阻塞 # self.__TCP_SOCKET.setblocking(False) self.__TCP_SOCKET.settimeout(0.0) # 用来存放套接字对象的列表 self.connlist = list() def start_server(self): with self.__TCP_SOCKET as sock: sock.bind(self.ADDR) sock.listen() logger.info("Server is Running") while True: try: conn, addr = sock.accept() # logger.info(conn) # 将连接的套接字对象设置为非阻塞 conn.setblocking(False) msg = f"Hi,{addr}" self.send_data(conn, msg) # 添加到列表 self.connlist.append(conn) # 如果没有连接进来需要捕获BlockingIOError异常 except BlockingIOError as e: pass # logger.debug("没有新的客户端连接") # 循环套接字对象列表 进行收发数据 for conn in self.connlist: msg = self.recv_data(conn) self.send_data(conn, msg) def recv_data(self, conn): """接收数据""" try: msg = conn.recv(1024).decode("utf-8") if not msg or msg in ["quit"]: logger.debug("断开连接") # 将套接字对象从列表移除 self.connlist.remove(conn) else: logger.info(msg) return msg except IOError as e: pass # logger.debug("没有接收到数据") def send_data(self, conn, msg): """发送数据""" if msg: msg = f"From Server {msg}" try: conn.sendall(msg.encode("utf-8")) except ConnectionResetError as e: pass logger.debug("连接已断开,无法再发送信息") if __name__ == '__main__': ServerClass().start_server()

服务端非阻塞模式的情况下 主要通过循环控制不停的去捕获BlockingIOError 异常来判断是否有新的连接进来,或者是是否有数据可以接受到;该情况下CPU的使用率会很高

客户端代码:

import logging import time import socket import threading # demo_logging_1.load_loggingconfig() logging.basicConfig(level=logging.DEBUG, format="%(asctime)s>%(message)s", datefmt="%Y-%m-%d %H:%M:%S") logger = logging.getLogger(__name__) class ClientClass(object): """docstring for ClientClass""" def __init__(self): self.__HOST = "127.0.0.1" self.__PORT = 9999 self.__ADDR = (self.__HOST, self.__PORT) self.__TCP_SOCKET = socket.socket( family=socket.AF_INET, type=socket.SOCK_STREAM) def start_client(self): """启动客户端""" with self.__TCP_SOCKET as sock: # 链接服务端地址 sock.connect(self.__ADDR) logger.info("%s" % sock.recv(1024).decode("utf-8")) recv_t = threading.Thread( target=self.recv_data, args=(sock,)) # 向服务端发送数据 send_t = threading.Thread( target=self.send_data, args=(sock,)) # 接收数据线程设置为守护线程 recv_t.setDaemon(True) recv_t.start() send_t.start() send_t.join() def send_data(self, sock): while True: send_data = input() sock.sendall(send_data.encode("utf-8")) # 如果输入 quit 或者 exit 断开连接 if send_data in ("quit"): logger.info("正在退出...") break def recv_data(self, sock): while True: try: recv_data = sock.recv(1024).decode("utf-8") logger.info(recv_data) except Exception as e: pass # logger.error(e, exc_info=True) time.sleep(0.5) break if __name__ == '__main__': ClientClass().start_client()

效果: 在这里插入图片描述

服务端不需要多开启进程或者线程就可以实现与多个客户端之间通信

select模块

把上面非阻塞的服务端修改下,利用select模块的select方法对套接字对象进行监控;

# select.select(rlist, wlist, xlist[, timeout]) rlist:等待,直到可以开始读取wlist:等待,直到可以开始写入xlist:等待“异常情况”可选的 timeout 参数以一个浮点数表示超时秒数

服务端:

# demo_socket_server_3.py文件 import select import logging import socket import queue logging.basicConfig(level=logging.DEBUG, format="%(asctime)s>%(message)s", datefmt="%Y-%m-%d %H:%M:%S") logger = logging.getLogger(__name__) class ServerClass(object): """docstring for ServerClass""" def __init__(self): self.__HOST = "127.0.0.1" self.__PORT = 9999 self.ADDR = (self.__HOST, self.__PORT) self.__TCP_SOCKET = socket.socket( family=socket.AF_INET, type=socket.SOCK_STREAM) # 设置非阻塞 # self.__TCP_SOCKET.setblocking(False) self.__TCP_SOCKET.settimeout(0.0) # 用来存放套接字对象的列表 self.inputlist = list() self.outputlist = list() # 存放客户端发送过来的数据 self.msg_dict = dict() def start_server(self): with self.__TCP_SOCKET as sock: sock.bind(self.ADDR) sock.listen() logger.info("Server is Running") # 将套接字对象添加到列表中 self.inputlist.append(sock) while True: rlist, wlist, xlist = select.select( self.inputlist, self.outputlist, self.inputlist) for r_conn in rlist: # 如果套接字对象是self.__TCP_SOCKET;表示有新的连接进来了需要接受 if r_conn is sock: conn, addr = r_conn.accept() logger.info(f"{addr} 已连接") # 和连接的客户端打个招呼 conn.sendall(f"Hi,{addr}".encode("utf-8")) # 设置为非阻塞 conn.setblocking(False) # 将新的连接添加到套接字对象列表进行监控 self.inputlist.append(conn) # 每接受一个连接: 将连接作为键,一个空队列作为值 self.msg_dict[conn] = queue.Queue() # 否则 是已经建立的连接发送过来数据了 需要接受数据 else: client_addr = r_conn.getpeername() try: recv_data = r_conn.recv(1024) # 如果有数据 接收;数据存储,将该连接添加到self.outputlist准备下一步发送数据 if recv_data and recv_data.decode("utf-8") != "quit": logger.info( f"接收到来自 {client_addr} 的数据:{recv_data.decode('utf-8')}") # msg_dict: 连接为 字典键 接收到的数据放到队列 为 字典值 self.msg_dict[r_conn].put(recv_data) if r_conn not in self.outputlist: self.outputlist.append(r_conn) # 否则 证明该连接已经断开了 else: logger.info(f"{client_addr} 已断开") # 执行清除 self.clear_conn(r_conn) except ConnectionResetError as e: # 捕获 ConnectionResetError 表示 客户端断开 logger.info(f"{client_addr} 异常断开") # 执行清除 self.clear_conn(r_conn) for w_conn in wlist: try: if w_conn in self.msg_dict: msg = self.msg_dict[w_conn].get(False) except queue.Empty as e: # 数据队列为空表示 该连接没有发送数据 服务端没有接收到 pass else: try: w_conn.sendall(b'From Server ' + msg) except ConnectionResetError as e: # 捕获ConnectionResetError 客户端断开 执行清除 self.clear_conn(w_conn) for e_conn in xlist: e_conn.close() # 执行清除 self.clear_conn(e_conn) def clear_conn(self, conn): """清除已经断开的连接""" if conn in self.inputlist: self.inputlist.remove(conn) if conn in self.outputlist: self.outputlist.remove(conn) if conn in self.msg_dict: del self.msg_dict[conn] if __name__ == '__main__': ServerClass().start_server()

现在设定的socket对象虽然是非阻塞的,但是因为select方法的作用,使用起来好像和阻塞的没有什么区别,是因为select把socket监控起来

rlist, wlist, xlist = select.select(self.inputlist, self.outputlist, self.inputlist)

如果在inputlist,outpulist中有活动的socket对象就会返回在rlist,wlist,xlist中;然后循环调用每个list去完成逻辑处理。 该方法同样是可以在单线程模式下实现服务端对多个客户端之间进行通信;

客户端代码:同上;

效果展示: 在这里插入图片描述

selectors模块

selectors模块是高级 I/O 复用库,它建立在 select 模块原型的基础之上。Python文档推荐用户改用此模块。

# 默认的选择器类,使用当前平台上可用的最高效选择器的实现 sel = selectors.DefaultSelector() # 注册一个用于选择的文件对象,在其上监视 I/O 事件 sel.register(fileobj, events, data=None) # 注销对一个文件对象的选择,移除对它的监视 sel.unregister(fileobj) # 等待直到有已注册的文件对象就绪,或是超过时限 # 返回由 (key, events) 元组构成的列表,每项各表示一个就绪的文件对象,key 是对应于就绪文件对象的 SelectorKey 实例。 events 是在此文件对象上等待的事件位掩码。 events = sel.select()

selectors.SelectorKey类 用来将文件对象关联到其下层的文件描述器、选定事件掩码和附加数据等;有以下属性:

名称说明fileobj已注册的文件对象fd下层的文件描述器events此文件对象上被等待的事件data可选的关联到此文件对象的不透明数据:例如,这可被用来存储各个客户端的会话 ID

events 是一个位掩码,说明哪些 I/O 事件要在给定的文件对象上执行等待。 它可以是以下模块级常量的组合:

名称说明selectors.EVENT_READ可读selectors.EVENT_WRITE可写

服务端代码示例:

import selectors import logging import socket logging.basicConfig(level=logging.DEBUG, format="%(asctime)s>%(message)s", datefmt="%Y-%m-%d %H:%M:%S") logger = logging.getLogger(__name__) class ServerClass(object): """docstring for ServerClass""" def __init__(self): self.__HOST = "127.0.0.1" self.__PORT = 9999 self.ADDR = (self.__HOST, self.__PORT) self.__TCP_SOCKET = socket.socket( family=socket.AF_INET, type=socket.SOCK_STREAM) # 设置非阻塞 # self.__TCP_SOCKET.setblocking(False) self.__TCP_SOCKET.settimeout(0.0) self.sele = selectors.DefaultSelector() def start_server(self): with self.__TCP_SOCKET as sock: sock.bind(self.ADDR) sock.listen() logger.info("Server is Running") self.sele.register( sock, selectors.EVENT_READ, self.accept_conn) while True: events = self.sele.select() for key, mask in events: callback = key.data callback(key.fileobj, mask) def accept_conn(self, sock, mask): conn, addr = sock.accept() logger.info(f"{addr} 已连接") conn.sendall(f"Hi,{addr}".encode("utf-8")) conn.setblocking(False) self.sele.register(conn, selectors.EVENT_READ, self.read) def read(self, conn, mask): try: client_addr = conn.getpeername() recv_data = conn.recv(1024) if recv_data and recv_data.decode("utf-8") != "quit": logger.info( f"接收到来自 {client_addr} 的数据:{recv_data.decode('utf-8')}") conn.sendall(b'From Server ' + recv_data) else: logger.info(f"{client_addr} 已断开") self.sele.unregister(conn) except ConnectionResetError as e: logger.info(f"{client_addr} 异常断开") self.sele.unregister(conn) conn.close() if __name__ == '__main__': ServerClass().start_server()

比select模块的select方法更简洁一些;

客户端代码 :同上; 效果展示: 在这里插入图片描述 和select方法结果是一样的; 主要参考: 一只小小寄居蟹 --这个博客 Python 中文文档 selectors — 高级 I/O 复用库

以上就是所有关于Python socket模块的非阻塞模式的学习 如果有什么不对的地方,欢迎指正!



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有